Forum Discussion

Art's avatar
Art
New Contributor
2 months ago

Duplicate entries after imports

I'm new for 1password.
I imported my passwords from Bitwarden and my Google account and now I have dozen of duplicated entries. 
I tried to use a Watchtower but it doesn't see any duplicates at all.
The duplicated items are different in tags and time of editing and might that is the problem.

Any way to delete the duplicates despite of the differences?
Can I define to the Watchtower search the fields that are relevant for comparison (or event better the fields that aren't relevant)?

Thanks ahead for the help!

6 Replies

  • Hello Art​! 👋

    Welcome to 1Password! As AJCxZ0 mentioned, if the duplicate items are exactly the same and in the same vault then you can also use 1Password's Watchtower tool to clean them up.

    However, if they're not exact duplicates then they won't be detected as such. In that case, you'll need to go through your items manually if you've imported a similar set of items from two different password managers. 

    Alternatively, if you still have your items in the other password managers, you could delete all of the imported items in 1Password and start over. Then, re-import your passwords from either Google or Bitwarden and then manually add any missing details from the other password manager. 

    -Dave

  • Art's avatar
    Art
    New Contributor

    Is there any limitation for the posts?
    I'm trying to post here the script but something fails on "Reply"...

    • 1P_Timothy's avatar
      1P_Timothy
      Icon for Community Manager rankCommunity Manager

      Hi Art​. It looks like the post with script was caught in our spam filter. I've just pushed it through so it should be visible now. 

  • Art's avatar
    Art
    New Contributor
    #!/usr/bin/env python3 """ Merge duplicate 1Password items within a vault (by normalized title). Highlights - Dry-run by default; use --apply to make changes - JSON backups of all involved items - Merges: • URLs (union) • Notes (append summaries + conflicts) • Loose (non-section) custom fields: add/update, conflicts -> notes • Custom sections on duplicates: summarized into notes (redacted by default) - Correct archiving via: op item delete --archive <id> - Multi-account support via --account - Robust TAG FIX: • Compute cleaned tags (de-dupe, drop "Imported*") • Force-set tags via JSON template (most reliable across CLI versions) """ import argparse import json import re import subprocess import sys import tempfile from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Set, Tuple, Optional # ---------------------- Global account flag ---------------------- OP_ACCOUNT: Optional[str] = None def _with_account(args: List[str]) -> List[str]: """Append --account if provided.""" return args + (["--account", OP_ACCOUNT] if OP_ACCOUNT else []) # ---------------------- Subprocess / op helpers ------------------ def run_op(args: List[str], input_bytes: Optional[bytes] = None) -> subprocess.CompletedProcess: """Run an `op` command and return the completed process. Raises on nonzero exit.""" cp = subprocess.run( ["op"] + _with_account(args), input=input_bytes, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) if cp.returncode != 0: raise RuntimeError(f"op {' '.join(args)} failed: {cp.stderr.decode(errors='ignore')}") return cp def op_json(args: List[str]) -> Any: """Run `op ... --format json` and parse JSON (empty -> None).""" cp = run_op(args + ["--format", "json"]) out = cp.stdout.decode() return json.loads(out or "null") # ---------------------- Utilities ------------------------------- def normalize_title(title: str) -> str: """Lowercase + collapse whitespace for grouping duplicates by title.""" return re.sub(r"\s+", " ", (title or "")).strip().casefold() def parse_time(iso_str: Optional[str]) -> datetime: if not iso_str: return datetime.min try: s = iso_str.rstrip("Z") return datetime.fromisoformat(s) except Exception: return datetime.min def list_items(vault: str) -> List[Dict[str, Any]]: return op_json(["item", "list", "--vault", vault]) or [] def get_item(item_id: str) -> Dict[str, Any]: return op_json(["item", "get", item_id]) or {} def backup_item_json(item: Dict[str, Any], backup_dir: Path) -> None: backup_dir.mkdir(parents=True, exist_ok=True) title = (item.get("title") or "untitled").strip().replace("/", "_") item_id = item.get("id") or "unknown" fname = backup_dir / f"{title}__{item_id}.json" with fname.open("w", encoding="utf-8") as f: json.dump(item, f, ensure_ascii=False, indent=2) def union_urls(item: Dict[str, Any]) -> Set[str]: urls = set() for u in item.get("urls") or []: href = u.get("href") if isinstance(u, dict) else None if href: urls.add(href.strip()) return urls def get_tags_list(item: Dict[str, Any]) -> List[str]: tags: List[str] = [] for t in item.get("tags") or []: if isinstance(t, str): tags.append(t) return tags def union_tags(item: Dict[str, Any]) -> Set[str]: return set(get_tags_list(item)) # ---------------------- Loose custom fields --------------------- FieldInfo = Tuple[str, str, str] # (id, label, value) def extract_loose_custom_fields(item: Dict[str, Any]) -> Dict[str, FieldInfo]: """ Return mapping canonical_label -> (id, label, value) for fields NOT in any section. Canonical label = first non-empty of label, id. Ignore empty values to reduce noise. """ result: Dict[str, FieldInfo] = {} for f in item.get("fields") or []: if f.get("section"): continue fid = (f.get("id") or "").strip() label = (f.get("label") or "").strip() value = f.get("value") if value is None: continue if isinstance(value, str) and value.strip() == "": continue canonical = (label or fid).strip() if not canonical: continue result[canonical] = (fid or label or canonical, label or fid or canonical, str(value)) return result def plan_field_edits( keeper_full: Dict[str, Any], merged_fields: Dict[str, FieldInfo], ) -> Tuple[List[Tuple[str, str]], List[Dict[str, str]]]: """ Decide which fields to update vs add on the keeper. Returns: - updates: list of (id_or_label, value) -> for --field "id=value" - additions: list of dicts -> for --add-field "type=STRING,label=...,value=..." """ keeper_map = extract_loose_custom_fields(keeper_full) updates: List[Tuple[str, str]] = [] additions: List[Dict[str, str]] = [] for canonical, (fid, label, value) in merged_fields.items(): if canonical in keeper_map: k_fid, k_label, k_value = keeper_map[canonical] if k_value != value: updates.append((k_fid or k_label or canonical, value)) else: additions.append({"label": label or canonical, "value": value, "type": "STRING"}) return updates, additions # ---------------------- Sectioned fields → notes ----------------- SENSITIVE_TYPES = {"CONCEALED", "OTP", "PASSWORD", "SSH_PRIVATE_KEY"} SENSITIVE_LABEL_RE = re.compile(r"(password|secret|token|key|otp|api|private)", re.I) def _sanitize_value(val: Any, max_len: int = 400) -> str: s = str(val) s = s.replace("\r\n", "\n").replace("\r", "\n") if len(s) > max_len: s = s[:max_len] + "…" return s def summarize_sectioned_fields_for_notes( item: Dict[str, Any], include_values: bool = False ) -> Optional[str]: """ Summarize custom sections & fields from this item. Redacts sensitive values unless include_values=True. Returns None if there are no sectioned fields. """ fields = item.get("fields") or [] if not fields: return None sec_label_by_id: Dict[str, str] = {} for sec in item.get("sections") or []: sid = sec.get("id") lab = sec.get("label") if sid: sec_label_by_id[sid] = lab or sid per_sec: Dict[str, List[Dict[str, Any]]] = {} for f in fields: sec = f.get("section") if not sec or not isinstance(sec, dict) or not sec.get("id"): continue sid = sec["id"] per_sec.setdefault(sid, []).append(f) if not per_sec: return None title = item.get("title") or "(untitled)" lines: List[str] = [] lines.append(f"---") lines.append(f"Custom sections (from archived item '{title}'):") for sid, flist in per_sec.items(): sec_label = sec_label_by_id.get(sid, sid) lines.append(f" [Section] {sec_label} ({sid})") for f in flist: f_label = (f.get("label") or f.get("id") or "").strip() or "(unnamed)" f_type = (f.get("type") or "UNKNOWN").strip() f_val = f.get("value") is_sensitive = (f_type in SENSITIVE_TYPES) or SENSITIVE_LABEL_RE.search(f_label or "") is not None show = _sanitize_value(f_val) if (include_values and not is_sensitive) else ("[REDACTED]" if is_sensitive else _sanitize_value(f_val)) lines.append(f" - {f_label} [{f_type}]: {show}") return "\n".join(lines).strip() # ---------------------- Escaping for CLI specs ------------------- def _escape_field_value_for_cli(value: str) -> str: s = str(value) s = s.replace("\\", "\\\\") s = s.replace(",", "\\,") s = s.replace("=", "\\=") s = s.replace("\n", "\\n") return s def _escape_add_field_spec(label: str, value: str, ftype: str = "STRING") -> str: def esc(s: str) -> str: s = s.replace("\\", "\\\\").replace(",", "\\,").replace("=", "\\=").replace("\n", "\\n") return s return f"type={esc(ftype)},label={esc(label)},value={esc(value)}" # ---------------------- Tag cleanup helpers --------------------- IMPORTED_RE = re.compile(r"^\s*imported\b", re.I) def _normalize_tag_basic(tag: str) -> str: return re.sub(r"\s+", " ", tag or "").strip() def clean_tags(keeper_tags: List[str], candidate_tags: Set[str]) -> List[str]: """ Deduplicate tags case-insensitively, prefer keeper's original casing, and drop any tag that starts with 'Imported' (case-insensitive). Returns a sorted (casefold) list. """ out_map: Dict[str, str] = {} for t in keeper_tags: t_norm = _normalize_tag_basic(t) if not t_norm or IMPORTED_RE.match(t_norm): continue out_map[t_norm.casefold()] = t_norm for t in candidate_tags: t_norm = _normalize_tag_basic(t) if not t_norm or IMPORTED_RE.match(t_norm): continue key = t_norm.casefold() if key not in out_map: out_map[key] = t_norm return sorted(out_map.values(), key=lambda s: s.casefold()) def force_set_tags_via_template(item_id: str, final_tags: List[str], apply: bool) -> None: """ Force tags by editing the *item JSON* via template (robust across CLI versions): 1) get item JSON 2) set item['tags'] = final_tags 3) op item edit <id> --template=<tmpfile> 4) verify """ if not apply: print(f"DRY-RUN FORCE-TAGS for {item_id}: {final_tags}") return item = get_item(item_id) # current state after other edits item["tags"] = final_tags # exact set with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8", delete=False, suffix=".json") as tf: json.dump(item, tf, ensure_ascii=False) tf.flush() run_op(["item", "edit", item_id, f"--template={tf.name}"]) # verify updated = get_item(item_id) updated_tags = [t for t in (updated.get("tags") or []) if isinstance(t, str)] if sorted([t.casefold() for t in updated_tags]) != sorted([t.casefold() for t in final_tags]): raise RuntimeError( f"Tag enforcement failed for {item_id}. Expected {final_tags}, got {updated_tags}" ) # ---------------------- Editing / applying changes --------------- def edit_item_merge( keeper_id: str, urls_to_add: List[str], notes_to_append: str, field_updates: List[Tuple[str, str]], field_additions: List[Dict[str, str]], apply: bool ) -> None: """ Apply non-tag merges to the keeper via `op item edit`. (Tags are handled separately via template for reliability.) """ keeper = get_item(keeper_id) current_notes = keeper.get("notesPlain") or "" merged_notes = current_notes if notes_to_append.strip(): merged_notes = (current_notes + ("\n\n" if current_notes else "") + notes_to_append).strip() args = ["item", "edit", keeper_id] # Add URLs for u in urls_to_add: args += ["--url", u] # Notes if merged_notes != current_notes: args += ["--notes", merged_notes] # Field updates (existing fields by id/label) for id_or_label, val in field_updates: args += ["--field", f"{id_or_label}={_escape_field_value_for_cli(val)}"] # Field additions (new loose custom fields) for f in field_additions: spec = _escape_add_field_spec(f.get("label", ""), f.get("value", ""), f.get("type", "STRING")) args += ["--add-field", spec] if apply: run_op(args) else: print("DRY-RUN op " + " ".join(args)) def archive_item(item: Dict[str, Any], apply: bool) -> None: """ Move an item to Archive: - op item delete --archive <id> - for Document items, also tries: op document delete --archive <id> """ item_id = item.get("id") category = (item.get("category") or "").upper() args1 = ["item", "delete", "--archive", item_id] args2 = ["document", "delete", "--archive", item_id] if apply: try: run_op(args1) except Exception: if category == "DOCUMENT": run_op(args2) else: raise else: print("DRY-RUN op " + " ".join(args1)) if category == "DOCUMENT": print("DRY-RUN (fallback) op " + " ".join(args2)) # ---------------------- Merge logic (grouping, keeper) ----------- def build_groups(items: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]: groups: Dict[str, List[Dict[str, Any]]] = {} for it in items: key = normalize_title(it.get("title") or "") groups.setdefault(key, []).append(it) return groups def choose_keeper(candidates: List[Dict[str, Any]]) -> Dict[str, Any]: def sort_key(it: Dict[str, Any]): return (parse_time(it.get("updatedAt")), parse_time(it.get("createdAt"))) return sorted(candidates, key=sort_key, reverse=True)[0] # ---------------------- Main ------------------------------------- def main(): parser = argparse.ArgumentParser(description="Merge duplicate 1Password items within a vault by title.") parser.add_argument("--vault", required=True, help="Vault name or ID") parser.add_argument("--apply", action="store_true", help="Actually apply changes (default is dry-run).") parser.add_argument("--backup-dir", default="./op_backups", help="Directory to store JSON backups.") parser.add_argument("--archive-duplicates", action="store_true", help="Archive duplicate items after merging.") parser.add_argument("--include-singletons", action="store_true", help="Also back up items that are not duplicates.") parser.add_argument("--include-section-values", action="store_true", help="Include raw values of custom-section fields in notes (default redacts sensitive).") parser.add_argument("--account", help="1Password account shorthand or sign-in address (optional)") args = parser.parse_args() global OP_ACCOUNT OP_ACCOUNT = args.account # Preflight: accept success from either account list or vault list try: _ = op_json(["account", "list"]) except Exception as e1: try: _ = op_json(["vault", "list"]) except Exception as e2: print( "ERROR: 1Password CLI not ready for this process.\n" f"• account list error: {e1}\n" f"• vault list error: {e2}\n" "Tips: pass --account <shorthand or sign-in address>, or ensure PATH/env matches the shell where `op whoami` works.", file=sys.stderr ) sys.exit(1) items = list_items(args.vault) if not items: print(f"No items found in vault '{args.vault}'.") return groups = build_groups(items) dup_groups = {k: v for k, v in groups.items() if k and len(v) > 1} print(f"Found {len(items)} items total in vault '{args.vault}'.") print(f"Found {len(dup_groups)} duplicate title group(s).") backup_root = Path(args.backup_dir) backup_root.mkdir(parents=True, exist_ok=True) # Optional: back up singletons if args.include_singletons: singleton_count = 0 for _, group in groups.items(): if len(group) == 1: full = get_item(group[0]["id"]) backup_item_json(full, backup_root / "singletons") singleton_count += 1 print(f"Backed up {singleton_count} singleton items.") merged_total = 0 archived_total = 0 for norm_title, group in sorted(dup_groups.items(), key=lambda x: x[0]): # Full JSON for each item in this group fulls = [get_item(it["id"]) for it in group] for f in fulls: backup_item_json(f, backup_root / "duplicates_raw") keeper = choose_keeper(group) keeper_full = get_item(keeper["id"]) # Gather unions and merge plans all_urls: Set[str] = set(union_urls(keeper_full)) notes_to_append_parts: List[str] = [] # ---- TAG MERGE (compute final set) ---- keeper_tags_list = get_tags_list(keeper_full) all_tags_candidates: Set[str] = set(keeper_tags_list) others_full = [it for it in fulls if it["id"] != keeper["id"]] for it in others_full: all_urls |= union_urls(it) all_tags_candidates |= union_tags(it) final_tags = clean_tags(keeper_tags_list, all_tags_candidates) # ---- Loose custom fields merge with conflict logging ---- keeper_fields_map = extract_loose_custom_fields(keeper_full) merged_fields: Dict[str, FieldInfo] = dict(keeper_fields_map) for it in others_full: it_title = it.get("title") or "(untitled)" # Raw notes extra_notes = it.get("notesPlain") or "" if extra_notes.strip(): notes_to_append_parts.append(f"---\nMerged notes from '{it_title}':\n{extra_notes}") # Loose custom fields (conflicts -> notes) extras = extract_loose_custom_fields(it) for canonical, (fid, label, value) in extras.items(): if canonical not in merged_fields: merged_fields[canonical] = (fid, label, value) else: _, _, existing_val = merged_fields[canonical] if existing_val != value: notes_to_append_parts.append( f"---\nDuplicate field conflict for '{label or canonical}' from '{it_title}': {value}" ) # Sectioned custom fields -> summarize to notes sec_summary = summarize_sectioned_fields_for_notes( it, include_values=args.include_section_values ) if sec_summary: notes_to_append_parts.append(sec_summary) # Compute edit plan vs current keeper updates, additions = plan_field_edits(keeper_full, merged_fields) urls_to_add = sorted(all_urls - union_urls(keeper_full)) notes_to_append = "\n\n".join(notes_to_append_parts).strip() # ----- Plan summary ----- print("\n=== MERGE GROUP ===") print(f"Title: {keeper_full.get('title')!r}") print(f"Keeper: {keeper_full.get('title')} [{keeper_full.get('id')}]") dup_list = [f"{o.get('title')} [{o.get('id')}]" for o in others_full] print(f"Duplicates: {dup_list}") if urls_to_add: print(f"Will add URLs to keeper: {urls_to_add}") print(f"Will set cleaned tags on keeper via template: {final_tags} (removing duplicates & 'Imported*')") if updates: print(f"Will update {len(updates)} loose custom field(s).") if additions: print(f"Will add {len(additions)} loose custom field(s).") if notes_to_append: print(f"Will append ~{len(notes_to_append)} chars of notes (includes section summaries & conflicts).") # 1) Apply non-tag edits edit_item_merge( keeper["id"], urls_to_add, notes_to_append, updates, additions, apply=args.apply, ) # 2) Force-set tags via template (authoritative) force_set_tags_via_template(keeper["id"], final_tags, apply=args.apply) merged_total += 1 # Archive duplicates (optional) if args.archive_duplicates and others_full: for o in others_full: archive_item(o, apply=args.apply) archived_total += 1 # ----- Summary ----- print("\nSummary:") print(f" Duplicate groups processed: {merged_total}") if args.archive_duplicates: print(f" Duplicates archived: {archived_total}") print("\nDone. (Dry-run)" if not args.apply else "\nDone.") if __name__ == "__main__": main()

    Ok, here is a script, based CLI, if someone will need it....

    Run a dry run
    ./merge_op_vault_items.py --vault "My Vault"

    Apply and archive
    ./merge_op_vault_items.py --vault "My Vault" --apply --archive-duplicates

  • Art's avatar
    Art
    New Contributor

    I'm afraid there are many more than fourteen :).
    This covers almost my entire password database—hundreds of entries. The problem is the "almost." Some entries are more relevant on Bitwarden, others on Google. And it's not just passwords, but also some additional fields and information.
    Yes, the merge feature can be very helpful in this situation.

  • AJCxZ0's avatar
    AJCxZ0
    Silver Expert

    Until 1Password adds a function to merge Items, we have to do so manually.

    Watchtower identifies reused passwords, so can help identify different Items for the same thing which are not duplicates, however that may not be particularly helpful compared to sorting and searching.

    The desktop client and browser extension don't have the kind of sophisticated filtering which you mention. You could try building your own using the command line interface (CLI), but if you only have twelve (or fourteen if you're a baker) redundant Items, then it's probably not worth the effort.